收发定时消息和延时消息

本文提供使用社区版TCP协议下的Java SDK收发定时消息和延时消息的示例代码。

背景信息

  • 定时消息:Producer将消息发送到云消息队列 RocketMQ 版服务端,但并不期望立马投递这条消息,而是推迟到在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息。
  • 延时消息:Producer将消息发送到云消息队列 RocketMQ 版服务端,但并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。

更多信息,请参见定时和延时消息

重要

社区版的Apache RocketMQ和阿里云云消息队列 RocketMQ 版配置方式不同,实现效果也有所差异。社区版的Apache RocketMQ支持延时消息,但不支持定时消息,因此没有专门的定时消息接口。阿里云云消息队列 RocketMQ 版不仅同时支持配置延时消息和定时消息,并且定时和延时时间可以精确到秒级、拥有更高的并发性。建议您优先使用云上定时延时的方式,使用方法,请参考以下步骤。

前提条件

您已完成以下操作:

发送定时消息或延时消息

发送定时消息或延时消息的示例代码如下。

import java.util.Date;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQProducer {
    /**
     * 替换为您阿里云账号的AccessKey ID和AccessKey Secret。
     * 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }

    public static void main(String[] args) throws MQClientException {
        /**
         *创建Producer,并开启消息轨迹。设置为您在阿里云消息队列RocketMQ版控制台创建的Group ID。
         *如果不想开启消息轨迹,可以按照如下方式创建:
         *DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
         */
        DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);
        /**
         *设置使用接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项,如果不开启消息轨迹功能,则运行不设置此项。
         */
        producer.setAccessChannel(AccessChannel.CLOUD);
        /**
         *设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“http://MQ_INST_XXXX.aliyuncs.com:80”。
         */
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                /*设置为您在消息队列RocketMQ版控制台创建的Topic。*/
                Message msg = new Message("YOUR TOPIC",
                        /*设置消息的Tag。*/
                        "YOUR MESSAGE TAG",
                        /*消息内容。*/
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                /*发送延时消息,需要设置延时时间,单位毫秒(ms),消息将在指定延时时间后投递,例如消息将在3秒后投递。*/
                long delayTime = System.currentTimeMillis() + 3000;
                msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));

                /**
                 *若需要发送定时消息,则需要设置定时时间,消息将在指定时间进行投递,例如消息将在2021-08-10 18:45:00投递。
                 *定时时间格式为:yyyy-MM-dd HH:mm:ss,若设置的时间戳在当前时间之前,则消息将被立即投递给Consumer。
                 * long timeStamp = newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-10 18:45:00").getTime();
                 * msg.putUserProperty("__STARTDELIVERTIME",String.valueOf(timeStamp));
                 */
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                //消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        //在应用退出前,销毁Producer对象。
        //注意:如果不销毁也没有问题。
        producer.shutdown();
    }
}

订阅定时消息或延时消息

订阅定时消息或延时消息的示例代码如下。

import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

public class RocketMQPushConsumer {
    /**
    * 替换为您阿里云账号的AccessKey ID和AccessKey Secret。
    * 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
    */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
    }
  public static void main(String[] args) throws MQClientException {
        /**
         * 创建Consumer,并开启消息轨迹。设置为您在阿里云消息队列RocketMQ版控制台创建的Group ID。
         * 如果不想开启消息轨迹,可以按照如下方式创建:
         * DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely());
         */
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);
        //设置为阿里云消息队列RocketMQ版实例的接入点。
    consumer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
        //阿里云上消息轨迹需要设置为CLOUD方式,在使用云上消息轨迹的时候,需要设置此项,如果不开启消息轨迹功能,则运行不设置此项。
    consumer.setAccessChannel(AccessChannel.CLOUD);
        // 设置为您在阿里云消息队列RocketMQ版控制台上创建的Topic。
    consumer.subscribe("YOUR TOPIC", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
        ConsumeConcurrentlyContext context) {
        System.out.printf("Receive New Messages: %s %n", msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
    });
    consumer.start();
  }
}